Change watches: operations block until everyone has acked.
authorcl349@firebug.cl.cam.ac.uk <cl349@firebug.cl.cam.ac.uk>
Tue, 26 Jul 2005 15:20:09 +0000 (15:20 +0000)
committercl349@firebug.cl.cam.ac.uk <cl349@firebug.cl.cam.ac.uk>
Tue, 26 Jul 2005 15:20:09 +0000 (15:20 +0000)
Watch events are no longer sent to self
Watches no longer take a priority
async and asyncwait commands for xs_test, now we need to continue
despite blocking ops.
Print test name at end of verbose run on failure.
Use --trace-file arg to xenstored when testing
Signed-off-by: Rusty Russel <rusty@rustcorp.com.au>
Signed-off-by: Christian Limpach <Christian.Limpach@cl.cam.ac.uk>
16 files changed:
tools/xenstore/TODO
tools/xenstore/testsuite/07watch.sh
tools/xenstore/testsuite/08transaction.sh
tools/xenstore/testsuite/10domain-homedir.sh
tools/xenstore/testsuite/11domain-watch.sh
tools/xenstore/testsuite/12readonly.sh
tools/xenstore/testsuite/13watch-ack.sh
tools/xenstore/testsuite/test.sh
tools/xenstore/xenstored_core.c
tools/xenstore/xenstored_core.h
tools/xenstore/xenstored_transaction.c
tools/xenstore/xenstored_watch.c
tools/xenstore/xenstored_watch.h
tools/xenstore/xs.c
tools/xenstore/xs.h
tools/xenstore/xs_test.c

index 8e4185b211d662655c293dedf081621c2ad64fdc..71d5bbbf507a631dbb6366de5d0a1e3443d52522 100644 (file)
@@ -2,8 +2,9 @@ TODO in no particular order.  Some of these will never be done.  There
 are omissions of important but necessary things.  It is up to the
 reader to fill in the blanks.
 
-- Remove calls to system() from daemon
 - Timeout failed watch responses
-- Dynamic nodes
+- Dynamic/supply nodes
 - Persistant storage of introductions, watches and transactions, so daemon can restart
 - Remove assumption that rename doesn't fail
+- Multi-root transactions, for setting up front and back ends at same time.
+
index 88496d55e939569cf463c735adaaee917eb605cf..00af679a295ada8a14e5a9f724990fbff2b7a32b 100644 (file)
@@ -3,45 +3,52 @@
 # Watch something, write to it, check watch has fired.
 [ "`echo -e 'write /test create contents' | ./xs_test 2>&1`" = "" ]
 
-[ "`echo -e '1 watch /test token 100
-2 write /test create contents2
+[ "`echo -e '1 watch /test token
+2 async write /test create contents2
 1 waitwatch
 1 ackwatch token' | ./xs_test 2>&1`" = "1:/test:token" ]
 
 # Check that reads don't set it off.
-[ "`echo -e '1 watch /test token 100
+[ "`echo -e '1 watch /test token
 2 read /test
 1 waitwatch' | ./xs_test 2>&1`" = "2:contents2
 1:waitwatch timeout" ]
 
 # mkdir, setperm and rm should (also tests watching dirs)
 [ "`echo -e 'mkdir /dir' | ./xs_test 2>&1`" = "" ]
-[ "`echo -e '1 watch /dir token 100
-2 mkdir /dir/newdir
+[ "`echo -e '1 watch /dir token
+2 async mkdir /dir/newdir
 1 waitwatch
 1 ackwatch token
-2 setperm /dir/newdir 0 READ
+asyncwait
+2 async setperm /dir/newdir 0 READ
 1 waitwatch
 1 ackwatch token
-2 rm /dir/newdir
+asyncwait
+2 async rm /dir/newdir
 1 waitwatch
 1 ackwatch token' | ./xs_test 2>&1`" = "1:/dir/newdir:token
 1:/dir/newdir:token
 1:/dir/newdir:token" ]
 
+# We don't get a watch from our own commands.
+[ "`echo -e 'watch /dir token
+mkdir /dir/newdir
+waitwatch' | ./xs_test 2>&1`" = "waitwatch timeout" ]
+
 # ignore watches while doing commands, should work.
-[ "`echo -e 'watch /dir token 100
-write /dir/test create contents
+[ "`echo -e 'watch /dir token
+1 async write /dir/test create contents
 read /dir/test
 waitwatch
 ackwatch token' | ./xs_test 2>&1`" = "contents
 /dir/test:token" ]
 
-# watch priority /test.
-[ "`echo -e '1 watch /dir token1 1
-3 watch /dir token3 3
-2 watch /dir token2 2
-write /dir/test create contents
+# watch priority test: all simultaneous
+[ "`echo -e '1 watch /dir token1
+3 watch /dir token3
+2 watch /dir token2
+async write /dir/test create contents
 3 waitwatch
 3 ackwatch token3
 2 waitwatch
@@ -52,9 +59,9 @@ write /dir/test create contents
 1:/dir/test:token1" ]
 
 # If one dies (without acking), the other should still get ack.
-[ "`echo -e '1 watch /dir token1 0
-2 watch /dir token2 1
-write /dir/test create contents
+[ "`echo -e '1 watch /dir token1
+2 watch /dir token2
+async write /dir/test create contents
 2 waitwatch
 2 close
 1 waitwatch
@@ -62,51 +69,52 @@ write /dir/test create contents
 1:/dir/test:token1" ]
 
 # If one dies (without reading at all), the other should still get ack.
-[ "`echo -e '1 watch /dir token1 0
-2 watch /dir token2 1
-write /dir/test create contents
+[ "`echo -e '1 watch /dir token1
+2 watch /dir token2
+async write /dir/test create contents
 2 close
 1 waitwatch
 1 ackwatch token1' | ./xs_test 2>&1`" = "1:/dir/test:token1" ]
 
 # unwatch
-[ "`echo -e '1 watch /dir token1 0
+[ "`echo -e '1 watch /dir token1
 1 unwatch /dir token1
-1 watch /dir token2 0
-2 write /dir/test2 create contents
+1 watch /dir token2
+2 async write /dir/test2 create contents
 1 waitwatch
 1 unwatch /dir token2' | ./xs_test 2>&1`" = "1:/dir/test2:token2" ]
 
 # unwatch while watch pending.  Next watcher gets the event.
-[ "`echo -e '1 watch /dir token1 0
-2 watch /dir token2 1
-write /dir/test create contents
+[ "`echo -e '1 watch /dir token1
+2 watch /dir token2
+async write /dir/test create contents
 2 unwatch /dir token2
 1 waitwatch
 1 ackwatch token1' | ./xs_test 2>&1`" = "1:/dir/test:token1" ]
 
 # unwatch while watch pending.  Should clear this so we get next event.
-[ "`echo -e '1 watch /dir token1 0
-write /dir/test create contents
+[ "`echo -e '1 watch /dir token1
+async write /dir/test create contents
 1 unwatch /dir token1
-1 watch /dir/test token2 0
-write /dir/test none contents2
+1 watch /dir/test token2
+asyncwait
+async write /dir/test none contents2
 1 waitwatch
 1 ackwatch token2' | ./xs_test 2>&1`" = "1:/dir/test:token2" ]
 
 # check we only get notified once.
-[ "`echo -e '1 watch /test token 100
-2 write /test create contents2
+[ "`echo -e '1 watch /test token
+2 async write /test create contents2
 1 waitwatch
 1 ackwatch token
 1 waitwatch' | ./xs_test 2>&1`" = "1:/test:token
 1:waitwatch timeout" ]
 
 # watches are queued in order.
-[ "`echo -e '1 watch / token 100
-2 write /test1 create contents
-2 write /test2 create contents
-2 write /test3 create contents
+[ "`echo -e '1 watch / token
+async 2 write /test1 create contents
+async 2 write /test2 create contents
+async 2 write /test3 create contents
 1 waitwatch
 1 ackwatch token
 1 waitwatch
@@ -117,9 +125,9 @@ write /dir/test none contents2
 1:/test3:token" ]
 
 # Creation of subpaths should be covered correctly.
-[ "`echo -e '1 watch / token 100
-2 write /test/subnode create contents2
-2 write /test/subnode/subnode create contents2
+[ "`echo -e '1 watch / token
+2 async write /test/subnode create contents2
+2 async write /test/subnode/subnode create contents2
 1 waitwatch
 1 ackwatch token
 1 waitwatch
@@ -129,23 +137,23 @@ write /dir/test none contents2
 1:waitwatch timeout" ]
 
 # Watch event must have happened before we registered interest.
-[ "`echo -e '1 watch / token 100
-2 write /test/subnode create contents2
-2 watch / token2 0
+[ "`echo -e '1 watch / token
+2 async write /test/subnode create contents2
+1 watch / token2 0
 1 waitwatch
 1 ackwatch token
-2 waitwatch' | ./xs_test 2>&1`" = "1:/test/subnode:token
-2:waitwatch timeout" ]
+1 waitwatch' | ./xs_test 2>&1`" = "1:/test/subnode:token
+1:waitwatch timeout" ]
 
 # Rm fires notification on child.
-[ "`echo -e '1 watch /test/subnode token 100
-2 rm /test
+[ "`echo -e '1 watch /test/subnode token
+2 async rm /test
 1 waitwatch
 1 ackwatch token' | ./xs_test 2>&1`" = "1:/test/subnode:token" ]
 
 # Watch should not double-send after we ack, even if we did something in between.
-[ "`echo -e '1 watch /test2 token 100
-2 write /test2/foo create contents2
+[ "`echo -e '1 watch /test2 token
+2 async write /test2/foo create contents2
 1 waitwatch
 1 read /test2/foo
 1 ackwatch token
index c5311ed8225d58b79401b97144fe57ebefb474df..4c786df68756a7d4e57ebd0e5d4b61850019c467 100644 (file)
@@ -45,37 +45,37 @@ echo write /test/entry1 create contents | ./xs_test
 sleep 1
 rm /test/entry1
 commit
-dir /test' | ./xs_test`" = "" ]
+dir /test' | ./xs_test --no-timeout`" = "" ]
 
 # ... as long as noone is waiting.
 [ "`echo -e '1 start /test
 2 mkdir /test/dir
 1 mkdir /test/dir
 1 dir /test
-1 commit' | ./xs_test 2>&1`" = "1:dir
+1 commit' | ./xs_test --no-timeout 2>&1`" = "1:dir
 FATAL: 1: commit: Connection timed out" ]
 
 # Events inside transactions don't trigger watches until (successful) commit.
-[ "`echo -e '1 watch /test token 100
+[ "`echo -e '1 watch /test token
 2 start /test
 2 mkdir /test/dir/sub
 1 waitwatch' | ./xs_test 2>&1`" = "1:waitwatch timeout" ]
-[ "`echo -e '1 watch /test token 100
+[ "`echo -e '1 watch /test token
 2 start /test
 2 mkdir /test/dir/sub
 2 abort
 1 waitwatch' | ./xs_test 2>&1`" = "1:waitwatch timeout" ]
-[ "`echo -e '1 watch /test token 100
+[ "`echo -e '1 watch /test token
 2 start /test
 2 mkdir /test/dir/sub
-2 commit
+2 async commit
 1 waitwatch
 1 ackwatch token' | ./xs_test 2>&1`" = "1:/test/dir/sub:token" ]
 
 # Rm inside transaction works like rm outside: children get notified.
-[ "`echo -e '1 watch /test/dir/sub token 100
+[ "`echo -e '1 watch /test/dir/sub token
 2 start /test
 2 rm /test/dir
-2 commit
+2 async commit
 1 waitwatch
 1 ackwatch token' | ./xs_test 2>&1`" = "1:/test/dir/sub:token" ]
index 398adb562cb70f5153dec99b33cc0a871b0c7d98..39f03b2ebcf0574142f5f1b806f0f9e277cffb9d 100644 (file)
@@ -13,8 +13,8 @@ entry1" ]
 # Place a watch using a relative path: expect relative answer.
 [ "`echo 'introduce 1 100 7 /home
 1 mkdir foo
-1 watch foo token 0
-write /home/foo/bar create contents
+1 watch foo token
+async write /home/foo/bar create contents
 1 waitwatch
 1 ackwatch token' | ./xs_test 2>&1`" = "handle is 1
 1:foo/bar:token" ]
index f42fb5f8c6fb753271a2b44a85c5736c371206d8..6793244bca864b07148227172350035af059e90d 100644 (file)
@@ -6,42 +6,46 @@
 [ "`echo -e 'mkdir /dir' | ./xs_test 2>&1`" = "" ]
 
 [ "`echo -e 'introduce 1 100 7 /my/home
-1 watch /test token 100
-write /test create contents2
+1 watch /test token
+async write /test create contents2
 1 waitwatch
 1 ackwatch token
 1 unwatch /test token
+asyncwait
 release 1' | ./xs_test 2>&1`" = "handle is 1
 1:/test:token" ]
 
 # ignore watches while doing commands, should work.
 [ "`echo -e 'introduce 1 100 7 /my/home
-1 watch /dir token 100
-1 write /dir/test create contents
-1 read /dir/test
+1 watch /dir token
+async write /dir/test create contents
+1 write /dir/test2 create contents2
+1 write /dir/test3 create contents3
+1 write /dir/test4 create contents4
 1 waitwatch
 1 ackwatch token
+asyncwait
 release 1' | ./xs_test 2>&1`" = "handle is 1
-1:contents
 1:/dir/test:token" ]
 
 # unwatch
 [ "`echo -e 'introduce 1 100 7 /my/home
-1 watch /dir token1 0
+1 watch /dir token1
 1 unwatch /dir token1
-1 watch /dir token2 0
-2 write /dir/test2 create contents
+1 watch /dir token2
+async 2 write /dir/test2 create contents
 1 waitwatch
 1 unwatch /dir token2
+asyncwait
 release 1' | ./xs_test 2>&1`" = "handle is 1
 1:/dir/test2:token2" ]
 
 # unwatch while watch pending.
 [ "`echo -e 'introduce 1 100 7 /my/home
 introduce 2 101 8 /my/secondhome
-1 watch /dir token1 0
-2 watch /dir token2 1
-write /dir/test create contents
+1 watch /dir token1
+2 watch /dir token2
+3 async write /dir/test create contents
 2 unwatch /dir token2
 1 waitwatch
 1 ackwatch token1
index bfe6273fe11f25646a897ec2edd3dc61ac650782..646d95b32a66a02935dedf5b72c923a6de626112 100644 (file)
@@ -9,7 +9,7 @@ tool" ]
 
 [ "`echo 'read /test
 getperm /test
-watch /test token 0
+watch /test token
 unwatch /test token 
 start /
 commit
@@ -27,7 +27,7 @@ abort' | ./xs_test --readonly 2>&1`" = "contents
 
 # Check that watches work like normal.
 set -m
-[ "`echo 'watch / token 0
+[ "`echo 'watch / token
 waitwatch
 ackwatch token' | ./xs_test --readonly 2>&1`" = "/test:token" ] &
 
@@ -36,6 +36,3 @@ if wait; then :; else
     echo Readonly wait test failed: $?
     exit 1
 fi
-    
-    
-
index 70a7c6fa3ec98be3fa5befcc218eb09598621a1d..30125fa6af36ea9707944c7fc2bcbc0fe197a1ee 100644 (file)
@@ -15,8 +15,9 @@ echo mkdir /test/3 | ./xs_test
 [ "`echo '1 watch /test/1 token1 0
 1 watch /test/2 token2 0
 1 watch /test/3 token3 0
-2 write /test/2 create contents2
+2 async write /test/2 create contents2
 1 waitwatch
-2 write /test/1 create contents1
-2 write /test/3 create contents3
-1 ackwatch token2' | ./xs_test 2>&1`" = "1:/test/2:token2" ]
+3 async write /test/1 create contents1
+4 async write /test/3 create contents3
+1 ackwatch token2
+1 close' | ./xs_test 2>&1`" = "1:/test/2:token2" ]
index 3f0055842df3ce233101ea9e40aa0262ffb181c6..4cd550a28e3fab59cb2643ef01beb764edd54fca 100755 (executable)
@@ -9,7 +9,7 @@ run_test()
     mkdir $XENSTORED_ROOTDIR
 # Weird failures with this.
     if type valgrind >/dev/null 2>&1; then
-       valgrind -q --logfile-fd=3 ./xenstored_test --output-pid --no-fork 3>testsuite/tmp/vgout > /tmp/pid 2> testsuite/tmp/xenstored_errors &
+       valgrind -q --logfile-fd=3 ./xenstored_test --output-pid --trace-file=testsuite/tmp/trace --no-fork 3>testsuite/tmp/vgout > /tmp/pid 2> testsuite/tmp/xenstored_errors &
        while [ ! -s /tmp/pid ]; do sleep 0; done
        PID=`cat /tmp/pid`
        rm /tmp/pid
@@ -38,7 +38,9 @@ for f in testsuite/[0-9]*.sh; do
        echo Test $f passed...
     else
        echo Test $f failed, running verbosely...
-       run_test $f -x
+       run_test $f -x || true
+       # That will have filled the screen, repeat message.
+       echo Test $f failed
        exit 1
     fi
 done
index f6f6e71cfc14a9ea8613b446df738c0197473679..386d323d37e325da8f05f2c6218df232ab84f4aa 100644 (file)
@@ -51,7 +51,7 @@
 #include "xenstored_domain.h"
 
 static bool verbose;
-static LIST_HEAD(connections);
+LIST_HEAD(connections);
 static int tracefd = -1;
 
 #ifdef TESTING
@@ -959,8 +959,11 @@ static void do_write(struct connection *conn, struct buffered_data *in)
        }
 
        add_change_node(conn->transaction, node, false);
+       if (fire_watches(conn, node, false)) {
+               conn->watch_ack = XS_WRITE;
+               return;
+       }
        send_ack(conn, XS_WRITE);
-       fire_watches(conn->transaction, node, false);
 }
 
 static void do_mkdir(struct connection *conn, const char *node)
@@ -985,8 +988,11 @@ static void do_mkdir(struct connection *conn, const char *node)
        }
 
        add_change_node(conn->transaction, node, false);
+       if (fire_watches(conn, node, false)) {
+               conn->watch_ack = XS_MKDIR;
+               return;
+       }
        send_ack(conn, XS_MKDIR);
-       fire_watches(conn->transaction, node, false);
 }
 
 static void do_rm(struct connection *conn, const char *node)
@@ -1023,8 +1029,11 @@ static void do_rm(struct connection *conn, const char *node)
        }
 
        add_change_node(conn->transaction, node, true);
+       if (fire_watches(conn, node, true)) {
+               conn->watch_ack = XS_RM;
+               return;
+       }
        send_ack(conn, XS_RM);
-       fire_watches(conn->transaction, node, true);
 }
 
 static void do_get_perms(struct connection *conn, const char *node)
@@ -1095,8 +1104,11 @@ static void do_set_perms(struct connection *conn, struct buffered_data *in)
        }
 
        add_change_node(conn->transaction, node, false);
+       if (fire_watches(conn, node, false)) {
+               conn->watch_ack = XS_SET_PERMS;
+               return;
+       }
        send_ack(conn, XS_SET_PERMS);
-       fire_watches(conn->transaction, node, false);
 }
 
 /* Process "in" for conn: "in" will vanish after this conversation, so
@@ -1321,14 +1333,23 @@ static void unblock_connections(void)
        struct connection *i, *tmp;
 
        list_for_each_entry_safe(i, tmp, &connections, list) {
-               if (i->state == OK)
-                       continue;
-
-               if (!transaction_covering_node(i->blocked_by)) {
-                       talloc_free(i->blocked_by);
-                       i->blocked_by = NULL;
-                       i->state = OK;
-                       consider_message(i);
+               switch (i->state) {
+               case BLOCKED:
+                       if (!transaction_covering_node(i->blocked_by)) {
+                               talloc_free(i->blocked_by);
+                               i->blocked_by = NULL;
+                               i->state = OK;
+                               consider_message(i);
+                       }
+                       break;
+               case WATCHED:
+                       if (i->watches_unacked == 0) {
+                               i->state = OK;
+                               send_ack(i, i->watch_ack);
+                       }
+                       break;
+               case OK:
+                       break;
                }
        }
 
@@ -1351,6 +1372,8 @@ struct connection *new_connection(connwritefn_t *write, connreadfn_t *read)
 
        new->state = OK;
        new->blocked_by = NULL;
+       new->watch_ack = XS_ERROR;
+       new->watches_unacked = 0;
        new->out = new->waiting_reply = NULL;
        new->fd = -1;
        new->id = 0;
@@ -1359,6 +1382,7 @@ struct connection *new_connection(connwritefn_t *write, connreadfn_t *read)
        new->write = write;
        new->read = read;
        new->can_write = true;
+       INIT_LIST_HEAD(&new->watches);
 
        talloc_set_fail_handler(out_of_mem, &talloc_fail);
        if (setjmp(talloc_fail)) {
@@ -1430,13 +1454,12 @@ void dump_connection(void)
                printf("    state = %s\n",
                       i->state == OK ? "OK"
                       : i->state == BLOCKED ? "BLOCKED"
+                      : i->state == WATCHED ? "WATCHED"
                       : "INVALID");
                if (i->id)
                        printf("    id = %i\n", i->id);
                if (i->blocked_by)
                        printf("    blocked on = %s\n", i->blocked_by);
-               if (i->waiting_for_ack)
-                       printf("    waiting_for_ack TRUE\n");
                if (!i->in->inhdr || i->in->used)
                        printf("    got %i bytes of %s\n",
                               i->in->used, i->in->inhdr ? "header" : "data");
index 75a9bfe0a62c98c97ced0ad33dbbb713c0a62c5f..61d47b53425677ddacd7f89be073b8ee43dc055c 100644 (file)
@@ -51,6 +51,8 @@ enum state
 {
        /* Blocked by transaction. */
        BLOCKED,
+       /* Waiting for watchers to ack event we caused */
+       WATCHED,
        /* Completed */
        OK,
 };
@@ -71,6 +73,12 @@ struct connection
        /* Node we are waiting for (if state == BLOCKED) */
        char *blocked_by;
 
+       /* Are we waiting for watches to be acked from an event we caused? */
+       unsigned int watches_unacked;
+
+       /* Type of ack to send once watches fired. */
+       enum xsd_sockmsg_type watch_ack;
+
        /* Is this a read-only connection? */
        bool can_write;
 
@@ -92,10 +100,14 @@ struct connection
        /* The domain I'm associated with, if any. */
        struct domain *domain;
 
+       /* My watches. */
+       struct list_head watches;
+
        /* Methods for communicating over this connection: write can be NULL */
        connwritefn_t *write;
        connreadfn_t *read;
 };
+extern struct list_head connections;
 
 /* Return length of string (including nul) at this offset. */
 unsigned int get_string(const struct buffered_data *data,
index 60dcf04130165eae83be750c035a144e5f03da37..afaef1bef2f935dd7acf72e3414e9e03cbb101d2 100644 (file)
@@ -288,7 +288,6 @@ void do_transaction_start(struct connection *conn, const char *node)
 static bool commit_transaction(struct transaction *trans)
 {
        char *tmp, *dir;
-       struct changed_node *i;
 
        /* Move: orig -> .old, repl -> orig.  Cleanup deletes .old. */
        dir = node_dir_outside_transaction(trans->node);
@@ -301,15 +300,15 @@ static bool commit_transaction(struct transaction *trans)
                        trans->divert, dir);
 
        trans->divert = tmp;
-
-       /* Fire off the watches for everything that changed. */
-       list_for_each_entry(i, &trans->changes, list)
-               fire_watches(NULL, i->node, i->recurse);
        return true;
 }
 
 void do_transaction_end(struct connection *conn, const char *arg)
 {
+       struct changed_node *i;
+       struct transaction *trans;
+       bool fired = false;
+
        if (!arg || (!streq(arg, "T") && !streq(arg, "F"))) {
                send_error(conn, EINVAL);
                return;
@@ -320,24 +319,30 @@ void do_transaction_end(struct connection *conn, const char *arg)
                return;
        }
 
+       /* Set to NULL so fire_watches sends events. */
+       trans = conn->transaction;
+       conn->transaction = NULL;
+       /* Attach transaction to arg for auto-cleanup */
+       talloc_steal(arg, trans);
+
        if (streq(arg, "T")) {
-               if (conn->transaction->destined_to_fail) {
+               if (trans->destined_to_fail) {
                        send_error(conn, ETIMEDOUT);
-                       goto failed;
+                       return;
                }
-               if (!commit_transaction(conn->transaction)) {
+               if (!commit_transaction(trans)) {
                        send_error(conn, errno);
-                       goto failed;
+                       return;
                }
-       }
 
-       talloc_free(conn->transaction);
-       conn->transaction = NULL;
-       send_ack(conn, XS_TRANSACTION_END);
-       return;
+               /* Fire off the watches for everything that changed. */
+               list_for_each_entry(i, &trans->changes, list)
+                       fired |= fire_watches(conn, i->node, i->recurse);
+       }
 
-failed:
-       talloc_free(conn->transaction);
-       conn->transaction = NULL;
+       if (fired)
+               conn->watch_ack = XS_TRANSACTION_END;
+       else
+               send_ack(conn, XS_TRANSACTION_END);
 }
 
index 205b70399cac8cfa522998b672cea1fc2a4e1463..c532da26a846b1aa6ddd3eaeabe2ffa217e19594 100644 (file)
 #include "xenstored_domain.h"
 
 /* FIXME: time out unacked watches. */
-
-/* We create this if anyone is interested "node", then we pass it from
- * watch to watch as each connection acks it.
- */
 struct watch_event
 {
-       /* The watch we are firing for (watch->events) */
+       /* The events on this watch. */
        struct list_head list;
 
-       /* Watches we need to fire for (watches[0]->events == this). */
-       struct watch **watches;
-       unsigned int num_watches;
-
-       struct timeval timeout;
-
-       /* Name of node which changed. */
-       char *node;
+       /* Data to send (node\0token\0). */
+       unsigned int len;
+       char *data;
 
-       /* For remove, we trigger on all the children of this node too. */
-       bool recurse;
+       /* Connection which caused watch event (which we are blocking) */
+       struct connection *cause;
 };
 
 struct watch
 {
+       /* Watches on this connection */
        struct list_head list;
-       unsigned int priority;
 
        /* Current outstanding events applying to this watch. */
        struct list_head events;
 
        /* Is this relative to connnection's implicit path? */
-       bool relative;
+       const char *relative_path;
 
        char *token;
        char *node;
-       struct connection *conn;
 };
-static LIST_HEAD(watches);
-
-static struct watch_event *get_first_event(struct connection *conn)
-{
-       struct watch *watch;
-       struct watch_event *event;
-
-       /* Find first watch with an event. */
-       list_for_each_entry(watch, &watches, list) {
-               if (watch->conn != conn)
-                       continue;
-
-               event = list_top(&watch->events, struct watch_event, list);
-               if (event)
-                       return event;
-       }
-       return NULL;
-}
 
 /* Look through our watches: if any of them have an event, queue it. */
 void queue_next_event(struct connection *conn)
 {
        struct watch_event *event;
-       const char *node;
-       char *buffer;
-       unsigned int len;
+       struct watch *watch;
 
        /* We had a reply queued already?  Send it: other end will
         * discard watch. */
@@ -110,170 +80,93 @@ void queue_next_event(struct connection *conn)
        if (conn->waiting_for_ack)
                return;
 
-       event = get_first_event(conn);
-       if (!event)
-               return;
-
-       /* If we decide to cancel, we will reset this. */
-       conn->waiting_for_ack = event->watches[0];
-
-       /* If we deleted /foo and they're watching /foo/bar, that's what we
-        * tell them has changed. */
-       if (!is_child(event->node, event->watches[0]->node)) {
-               assert(event->recurse);
-               node = event->watches[0]->node;
-       } else
-               node = event->node;
-
-       /* If watch placed using relative path, give them relative answer. */
-       if (event->watches[0]->relative) {
-               node += strlen(get_implicit_path(conn));
-               if (node[0] == '/') /* Could be "". */
-                       node++;
-       }
-
-       /* Create reply from path and token */
-       len = strlen(node) + 1 + strlen(event->watches[0]->token) + 1;
-       buffer = talloc_array(conn, char, len);
-       strcpy(buffer, node);
-       strcpy(buffer+strlen(node)+1, event->watches[0]->token);
-       send_reply(conn, XS_WATCH_EVENT, buffer, len);
-       talloc_free(buffer);
-}
-
-static struct watch **find_watches(const char *node, bool recurse,
-                                  unsigned int *num)
-{
-       struct watch *i;
-       struct watch **ret = NULL;
-
-       *num = 0;
-
-       /* We include children too if this is an rm. */
-       list_for_each_entry(i, &watches, list) {
-               if (is_child(node, i->node) ||
-                   (recurse && is_child(i->node, node))) {
-                       (*num)++;
-                       ret = talloc_realloc(node, ret, struct watch *, *num);
-                       ret[*num - 1] = i;
+       list_for_each_entry(watch, &conn->watches, list) {
+               event = list_top(&watch->events, struct watch_event, list);
+               if (event) {
+                       conn->waiting_for_ack = watch;
+                       send_reply(conn,XS_WATCH_EVENT,event->data,event->len);
+                       break;
                }
        }
-       return ret;
 }
 
-/* FIXME: we fail to fire on out of memory.  Should drop connections. */
-void fire_watches(struct transaction *trans, const char *node, bool recurse)
+static int destroy_watch_event(void *_event)
 {
-       struct watch **watches;
-       struct watch_event *event;
-       unsigned int num_watches;
+       struct watch_event *event = _event;
 
-       /* During transactions, don't fire watches. */
-       if (trans)
-               return;
-
-       watches = find_watches(node, recurse, &num_watches);
-       if (!watches)
-               return;
-
-       /* Create and fill in info about event. */
-       event = talloc(talloc_autofree_context(), struct watch_event);
-       event->node = talloc_strdup(event, node);
-
-       /* Tie event to this watch. */
-       event->watches = watches;
-       talloc_steal(event, watches);
-       event->num_watches = num_watches;
-       event->recurse = recurse;
-       list_add_tail(&event->list, &watches[0]->events);
-
-       /* Warn if not finished after thirty seconds. */
-       gettimeofday(&event->timeout, NULL);
-       event->timeout.tv_sec += 30;
-
-       /* If connection not doing anything, queue this. */
-       if (!watches[0]->conn->out)
-               queue_next_event(watches[0]->conn);
+       trace_destroy(event, "watch_event");
+       assert(event->cause->watches_unacked != 0);
+       /* If it hits zero, will unblock in unblock_connections. */
+       event->cause->watches_unacked--;
+       return 0;
 }
 
-/* We're done with this event: see if anyone else wants it. */
-static void move_event_onwards(struct watch_event *event)
+static void add_event(struct connection *cause, struct watch *watch,
+                     const char *node)
 {
-       list_del(&event->list);
+       struct watch_event *event;
 
-       event->num_watches--;
-       event->watches++;
-       if (!event->num_watches) {
-               talloc_free(event);
-               return;
+       if (watch->relative_path) {
+               node += strlen(watch->relative_path);
+               if (*node == '/') /* Could be "" */
+                       node++;
        }
 
-       list_add_tail(&event->list, &event->watches[0]->events);
-
-       /* If connection not doing anything, queue this. */
-       if (!event->watches[0]->conn->out)
-               queue_next_event(event->watches[0]->conn);
+       event = talloc(watch, struct watch_event);
+       event->len = strlen(node) + 1 + strlen(watch->token) + 1;
+       event->data = talloc_array(event, char, event->len);
+       strcpy(event->data, node);
+       strcpy(event->data + strlen(node) + 1, watch->token);
+       event->cause = cause;
+       cause->watches_unacked++;
+       talloc_set_destructor(event, destroy_watch_event);
+       list_add_tail(&event->list, &watch->events);
+       trace_create(event, "watch_event");
 }
 
-static void remove_watch_from_events(struct watch *dying_watch)
+/* FIXME: we fail to fire on out of memory.  Should drop connections. */
+bool fire_watches(struct connection *conn, const char *node, bool recurse)
 {
+       struct connection *i;
        struct watch *watch;
-       struct watch_event *event;
-       unsigned int i;
 
-       list_for_each_entry(watch, &watches, list) {
-               list_for_each_entry(event, &watch->events, list) {
-                       for (i = 0; i < event->num_watches; i++) {
-                               if (event->watches[i] != dying_watch)
-                                       continue;
-
-                               assert(i != 0);
-                               memmove(event->watches+i,
-                                       event->watches+i+1,
-                                       (event->num_watches - (i+1))
-                                       * sizeof(struct watch *));
-                               event->num_watches--;
-                       }
+       /* During transactions, don't fire watches. */
+       if (conn->transaction)
+               return false;
+
+       assert(conn->state == OK);
+
+       /* Create an event for each watch.  Don't send to self. */
+       list_for_each_entry(i, &connections, list) {
+               if (i == conn)
+                       continue;
+
+               list_for_each_entry(watch, &i->watches, list) {
+                       if (is_child(node, watch->node))
+                               add_event(conn, watch, node);
+                       else if (recurse && is_child(watch->node, node))
+                               add_event(conn, watch, watch->node);
+                       else
+                               continue;
+                       conn->state = WATCHED;
+                       /* If connection not doing anything, queue this. */
+                       if (!i->out)
+                               queue_next_event(i);
                }
        }
+       return conn->state == WATCHED;
 }
 
 static int destroy_watch(void *_watch)
 {
-       struct watch *watch = _watch;
-       struct watch_event *event;
-
-       /* If we have pending events, pass them on to others. */
-       while ((event = list_top(&watch->events, struct watch_event, list)))
-               move_event_onwards(event);
-
-       /* Remove from global list. */
-       list_del(&watch->list);
-
-       /* Other events which match this watch must be cleared. */
-       remove_watch_from_events(watch);
-
-       trace_destroy(watch, "watch");
+       trace_destroy(_watch, "watch");
        return 0;
 }
 
-/* We keep watches in priority order. */
-static void insert_watch(struct watch *watch)
-{
-       struct watch *i;
-
-       list_for_each_entry(i, &watches, list) {
-               if (i->priority <= watch->priority) {
-                       list_add_tail(&watch->list, &i->list);
-                       return;
-               }
-       }
-
-       list_add_tail(&watch->list, &watches);
-}
-
 void shortest_watch_ack_timeout(struct timeval *tv)
 {
+       (void)tv;
+#if 0 /* FIXME */
        struct watch *watch;
 
        list_for_each_entry(watch, &watches, list) {
@@ -285,10 +178,12 @@ void shortest_watch_ack_timeout(struct timeval *tv)
                                *tv = i->timeout;
                }
        }
+#endif
 }      
 
 void check_watch_ack_timeout(void)
 {
+#if 0
        struct watch *watch;
        struct timeval now;
 
@@ -308,12 +203,13 @@ void check_watch_ack_timeout(void)
                        }
                }
        }
+#endif
 }
 
 void do_watch(struct connection *conn, struct buffered_data *in)
 {
        struct watch *watch;
-       char *vec[3];
+       char *vec[2];
        bool relative;
 
        if (get_strings(in, vec, ARRAY_SIZE(vec)) != ARRAY_SIZE(vec)) {
@@ -331,14 +227,16 @@ void do_watch(struct connection *conn, struct buffered_data *in)
        watch = talloc(conn, struct watch);
        watch->node = talloc_strdup(watch, vec[0]);
        watch->token = talloc_strdup(watch, vec[1]);
-       watch->conn = conn;
-       watch->priority = strtoul(vec[2], NULL, 0);
-       watch->relative = relative;
+       if (relative)
+               watch->relative_path = get_implicit_path(conn);
+       else
+               watch->relative_path = NULL;
+
        INIT_LIST_HEAD(&watch->events);
 
-       insert_watch(watch);
-       talloc_set_destructor(watch, destroy_watch);
+       list_add_tail(&watch->list, &conn->watches);
        trace_create(watch, "watch");
+       talloc_set_destructor(watch, destroy_watch);
        send_ack(conn, XS_WATCH);
 }
 
@@ -356,9 +254,6 @@ void do_watch_ack(struct connection *conn, const char *token)
                return;
        }
 
-       event = list_top(&conn->waiting_for_ack->events,
-                        struct watch_event, list);
-       assert(event->watches[0] == conn->waiting_for_ack);
        if (!streq(conn->waiting_for_ack->token, token)) {
                /* They're confused: this will cause us to send event again */
                conn->waiting_for_ack = NULL;
@@ -366,7 +261,12 @@ void do_watch_ack(struct connection *conn, const char *token)
                return;
        }
 
-       move_event_onwards(event);
+       /* Remove event: after ack sent, core will call queue_next_event */
+       event = list_top(&conn->waiting_for_ack->events, struct watch_event,
+                        list);
+       list_del(&event->list);
+       talloc_free(event);
+
        conn->waiting_for_ack = NULL;
        send_ack(conn, XS_WATCH_ACK);
 }
@@ -385,11 +285,9 @@ void do_unwatch(struct connection *conn, struct buffered_data *in)
         * watch we're deleting: conn->waiting_for_ack was reset by
         * this command in consider_message anyway. */
        node = canonicalize(conn, vec[0]);
-       list_for_each_entry(watch, &watches, list) {
-               if (watch->conn != conn)
-                       continue;
-
+       list_for_each_entry(watch, &conn->watches, list) {
                if (streq(watch->node, node) && streq(watch->token, vec[1])) {
+                       list_del(&watch->list);
                        talloc_free(watch);
                        send_ack(conn, XS_UNWATCH);
                        return;
@@ -404,15 +302,16 @@ void dump_watches(struct connection *conn)
        struct watch *watch;
        struct watch_event *event;
 
-       /* Find first watch with an event. */
-       list_for_each_entry(watch, &watches, list) {
-               if (watch->conn != conn)
-                       continue;
+       if (conn->waiting_for_ack)
+               printf("    waiting_for_ack for watch on %s token %s\n",
+                      conn->waiting_for_ack->node,
+                      conn->waiting_for_ack->token);
 
-               printf("    watch on %s token %s prio %i\n",
-                      watch->node, watch->token, watch->priority);
+       list_for_each_entry(watch, &conn->watches, list) {
+               printf("    watch on %s token %s\n",
+                      watch->node, watch->token);
                list_for_each_entry(event, &watch->events, list)
-                       printf("        event: %s\n", event->node);
+                       printf("        event: %s\n", event->data);
        }
 }
 #endif
index c1ab41d8660cc9a65d67eeef745519097c733675..d1fac70502c32b6147640d27e6697b2e46789cc8 100644 (file)
@@ -32,8 +32,10 @@ bool is_watch_event(struct connection *conn, struct buffered_data *out);
 /* Look through our watches: if any of them have an event, queue it. */
 void queue_next_event(struct connection *conn);
 
-/* Fire all watches: recurse means all the children are effected (ie. rm) */
-void fire_watches(struct transaction *trans, const char *node, bool recurse);
+/* Fire all watches: recurse means all the children are effected (ie. rm).
+ * Returns true if there were any, meaning connection has to wait.
+ */
+bool fire_watches(struct connection *conn, const char *node, bool recurse);
 
 /* Find shortest timeout: if any, reduce tv (may already be set). */
 void shortest_watch_ack_timeout(struct timeval *tv);
index c11e02ae1e210bfcbc22dfe18a683bbd7a4be918..a1d667747a180392e474f7c04a4152120e4e8da8 100644 (file)
@@ -401,22 +401,16 @@ unwind:
 /* Watch a node for changes (poll on fd to detect, or call read_watch()).
  * When the node (or any child) changes, fd will become readable.
  * Token is returned when watch is read, to allow matching.
- * Priority indicates order if multiple watchers: higher is first.
  * Returns false on failure.
  */
-bool xs_watch(struct xs_handle *h, const char *path, const char *token,
-             unsigned int priority)
+bool xs_watch(struct xs_handle *h, const char *path, const char *token)
 {
-       char prio[MAX_STRLEN(priority)];
-       struct iovec iov[3];
+       struct iovec iov[2];
 
-       sprintf(prio, "%u", priority);
        iov[0].iov_base = (void *)path;
        iov[0].iov_len = strlen(path) + 1;
        iov[1].iov_base = (void *)token;
        iov[1].iov_len = strlen(token) + 1;
-       iov[2].iov_base = prio;
-       iov[2].iov_len = strlen(prio) + 1;
 
        return xs_bool(xs_talkv(h, XS_WATCH, iov, ARRAY_SIZE(iov), NULL));
 }
index 8779a6b33a58d25a6007081c3e2ba09aa9b058b7..1daf7b0150c7e0a37d4735d90db9f34004b09c72 100644 (file)
@@ -82,11 +82,9 @@ bool xs_set_permissions(struct xs_handle *h, const char *path,
 /* Watch a node for changes (poll on fd to detect, or call read_watch()).
  * When the node (or any child) changes, fd will become readable.
  * Token is returned when watch is read, to allow matching.
- * Priority indicates order if multiple watchers: higher is first.
  * Returns false on failure.
  */
-bool xs_watch(struct xs_handle *h, const char *path, const char *token,
-             unsigned int priority);
+bool xs_watch(struct xs_handle *h, const char *path, const char *token);
 
 /* Return the FD to poll on to see if a watch has fired. */
 int xs_fileno(struct xs_handle *h);
index 6ce5d701af5a354d3e921944ab9c8a81ee1afa27..71719365dd74320a0a4ed0dbb3865c41ea009a55 100644 (file)
@@ -20,6 +20,7 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <sys/types.h>
+#include <sys/wait.h>
 #include <sys/stat.h>
 #include <fcntl.h>
 #include <signal.h>
 #define XSTEST
 
 static struct xs_handle *handles[10] = { NULL };
+static unsigned int children;
+
+static bool timeout = true;
+static bool readonly = false;
 
 struct ringbuf_head
 {
@@ -173,7 +178,9 @@ static void __attribute__((noreturn)) usage(void)
             "  getperm <path>\n"
             "  setperm <path> <id> <flags> ...\n"
             "  shutdown\n"
-            "  watch <path> <token> <prio>\n"
+            "  watch <path> <token>\n"
+            "  async <command>...\n"
+            "  asyncwait\n"
             "  waitwatch\n"
             "  ackwatch <token>\n"
             "  unwatch <path> <token>\n"
@@ -186,22 +193,34 @@ static void __attribute__((noreturn)) usage(void)
             "  dump\n");
 }
 
-static char *arg(char *line, unsigned int num)
+static int argpos(const char *line, unsigned int num)
 {
-       static char *args[10];
-       unsigned int i, len = 0;
+       unsigned int i, len = 0, off = 0;
 
        for (i = 0; i <= num; i++) {
-               line += len;
-               line += strspn(line, " \t\n");
-               len = strcspn(line, " \t\n");
+               off += len;
+               off += strspn(line + off, " \t\n");
+               len = strcspn(line + off, " \t\n");
                if (!len)
-                       barf("Can't get arg %u", num);
+                       return off;
        }
+       return off;
+}
+
+static char *arg(char *line, unsigned int num)
+{
+       static char *args[10];
+       unsigned int off, len;
+
+       off = argpos(line, num);
+       len = strcspn(line + off, " \t\n");
+
+       if (!len)
+               barf("Can't get arg %u", num);
 
        free(args[num]);
        args[num] = malloc(len + 1);
-       memcpy(args[num], line, len);
+       memcpy(args[num], line+off, len);
        args[num][len] = '\0';
        return args[num];
 }
@@ -360,10 +379,9 @@ static void do_shutdown(unsigned int handle)
                failed(handle);
 }
 
-static void do_watch(unsigned int handle, const char *node, const char *token,
-                    const char *pri)
+static void do_watch(unsigned int handle, const char *node, const char *token)
 {
-       if (!xs_watch(handles[handle], node, token, atoi(pri)))
+       if (!xs_watch(handles[handle], node, token))
                failed(handle);
 }
 
@@ -388,6 +406,47 @@ static void do_ackwatch(unsigned int handle, const char *token)
                failed(handle);
 }
 
+/* Async wait for watch on handle */
+static void do_command(unsigned int default_handle, char *line);
+static void do_async(unsigned int handle, char *line)
+{
+       int child;
+       unsigned int i;
+       children++;
+       if ((child = fork()) != 0)
+               return;
+
+       /* Don't keep other handles open in parent. */
+       for (i = 0; i < ARRAY_SIZE(handles); i++) {
+               if (handles[i] && i != handle) {
+                       xs_daemon_close(handles[i]);
+                       handles[i] = NULL;
+               }
+       }
+
+       do_command(handle, line + argpos(line, 1));
+       exit(0);
+}
+
+static void do_asyncwait(unsigned int handle)
+{
+       int status;
+
+       if (handle)
+               barf("handle has no meaning with asyncwait");
+
+       if (children == 0)
+               barf("No children to wait for!");
+
+       if (waitpid(0, &status, 0) > 0) {
+               if (!WIFEXITED(status))
+                       barf("async died");
+               if (WEXITSTATUS(status))
+                       exit(WEXITSTATUS(status));
+       }
+       children--;
+}
+
 static void do_unwatch(unsigned int handle, const char *node, const char *token)
 {
        if (!xs_unwatch(handles[handle], node, token))
@@ -533,23 +592,106 @@ static void dump(int handle)
        free(subdirs);
 }
 
+static int handle;
+
+static void alarmed(int sig __attribute__((unused)))
+{
+       if (handle) {
+               char handlename[10];
+               sprintf(handlename, "%u:", handle);
+               write(STDOUT_FILENO, handlename, strlen(handlename));
+       }
+       write(STDOUT_FILENO, command, strlen(command));
+       write(STDOUT_FILENO, " timeout\n", strlen(" timeout\n"));
+       exit(1);
+}
+
+static void do_command(unsigned int default_handle, char *line)
+{
+       char *endp;
+
+       if (strspn(line, " \n") == strlen(line))
+               return;
+       if (strstarts(line, "#"))
+               return;
+
+       handle = strtoul(line, &endp, 10);
+       if (endp != line)
+               memmove(line, endp+1, strlen(endp));
+       else
+               handle = default_handle;
+
+       if (!handles[handle]) {
+               if (readonly)
+                       handles[handle] = xs_daemon_open_readonly();
+               else
+                       handles[handle] = xs_daemon_open();
+               if (!handles[handle])
+                       barf_perror("Opening connection to daemon");
+       }
+       command = arg(line, 0);
+
+       if (timeout)
+               alarm(5);
+
+       if (streq(command, "dir"))
+               do_dir(handle, arg(line, 1));
+       else if (streq(command, "read"))
+               do_read(handle, arg(line, 1));
+       else if (streq(command, "write"))
+               do_write(handle,
+                        arg(line, 1), arg(line, 2), arg(line, 3));
+       else if (streq(command, "setid"))
+               do_setid(handle, arg(line, 1));
+       else if (streq(command, "mkdir"))
+               do_mkdir(handle, arg(line, 1));
+       else if (streq(command, "rm"))
+               do_rm(handle, arg(line, 1));
+       else if (streq(command, "getperm"))
+               do_getperm(handle, arg(line, 1));
+       else if (streq(command, "setperm"))
+               do_setperm(handle, arg(line, 1), line);
+       else if (streq(command, "shutdown"))
+               do_shutdown(handle);
+       else if (streq(command, "watch"))
+               do_watch(handle, arg(line, 1), arg(line, 2));
+       else if (streq(command, "waitwatch"))
+               do_waitwatch(handle);
+       else if (streq(command, "async"))
+               do_async(handle, line);
+       else if (streq(command, "asyncwait"))
+               do_asyncwait(handle);
+       else if (streq(command, "ackwatch"))
+               do_ackwatch(handle, arg(line, 1));
+       else if (streq(command, "unwatch"))
+               do_unwatch(handle, arg(line, 1), arg(line, 2));
+       else if (streq(command, "close")) {
+               xs_daemon_close(handles[handle]);
+               handles[handle] = NULL;
+       } else if (streq(command, "start"))
+               do_start(handle, arg(line, 1));
+       else if (streq(command, "commit"))
+               do_end(handle, false);
+       else if (streq(command, "abort"))
+               do_end(handle, true);
+       else if (streq(command, "introduce"))
+               do_introduce(handle, arg(line, 1), arg(line, 2),
+                            arg(line, 3), arg(line, 4));
+       else if (streq(command, "release"))
+               do_release(handle, arg(line, 1));
+       else if (streq(command, "dump"))
+               dump(handle);
+       else if (streq(command, "sleep"))
+               sleep(atoi(arg(line, 1)));
+       else
+               barf("Unknown command %s", command);
+       fflush(stdout);
+       alarm(0);
+}
+
 int main(int argc, char *argv[])
 {
        char line[1024];
-       bool readonly = false, timeout = true;
-       int handle;
-
-       static void alarmed(int sig __attribute__((unused)))
-       {
-               if (handle) {
-                       char handlename[10];
-                       sprintf(handlename, "%u:", handle);
-                       write(STDOUT_FILENO, handlename, strlen(handlename));
-               }
-               write(STDOUT_FILENO, command, strlen(command));
-               write(STDOUT_FILENO, " timeout\n", strlen(" timeout\n"));
-               exit(1);
-       }
 
        if (argc > 1 && streq(argv[1], "--readonly")) {
                readonly = true;
@@ -557,7 +699,7 @@ int main(int argc, char *argv[])
                argv++;
        }
 
-       if (argc > 1 && streq(argv[1], "--notimeout")) {
+       if (argc > 1 && streq(argv[1], "--no-timeout")) {
                timeout = false;
                argc--;
                argv++;
@@ -570,81 +712,10 @@ int main(int argc, char *argv[])
        ringbuf_datasize = getpagesize() / 2 - sizeof(struct ringbuf_head);
 
        signal(SIGALRM, alarmed);
-       while (fgets(line, sizeof(line), stdin)) {
-               char *endp;
+       while (fgets(line, sizeof(line), stdin))
+               do_command(0, line);
 
-               if (strspn(line, " \n") == strlen(line))
-                       continue;
-               if (strstarts(line, "#"))
-                       continue;
-
-               handle = strtoul(line, &endp, 10);
-               if (endp != line)
-                       memmove(line, endp+1, strlen(endp));
-               else
-                       handle = 0;
-
-               if (!handles[handle]) {
-                       if (readonly)
-                               handles[handle] = xs_daemon_open_readonly();
-                       else
-                               handles[handle] = xs_daemon_open();
-                       if (!handles[handle])
-                               barf_perror("Opening connection to daemon");
-               }
-               command = arg(line, 0);
-
-               if (timeout)
-                       alarm(5);
-               if (streq(command, "dir"))
-                       do_dir(handle, arg(line, 1));
-               else if (streq(command, "read"))
-                       do_read(handle, arg(line, 1));
-               else if (streq(command, "write"))
-                       do_write(handle,
-                                arg(line, 1), arg(line, 2), arg(line, 3));
-               else if (streq(command, "setid"))
-                       do_setid(handle, arg(line, 1));
-               else if (streq(command, "mkdir"))
-                       do_mkdir(handle, arg(line, 1));
-               else if (streq(command, "rm"))
-                       do_rm(handle, arg(line, 1));
-               else if (streq(command, "getperm"))
-                       do_getperm(handle, arg(line, 1));
-               else if (streq(command, "setperm"))
-                       do_setperm(handle, arg(line, 1), line);
-               else if (streq(command, "shutdown"))
-                       do_shutdown(handle);
-               else if (streq(command, "watch"))
-                       do_watch(handle, arg(line, 1), arg(line, 2), arg(line, 3));
-               else if (streq(command, "waitwatch"))
-                       do_waitwatch(handle);
-               else if (streq(command, "ackwatch"))
-                       do_ackwatch(handle, arg(line, 1));
-               else if (streq(command, "unwatch"))
-                       do_unwatch(handle, arg(line, 1), arg(line, 2));
-               else if (streq(command, "close")) {
-                       xs_daemon_close(handles[handle]);
-                       handles[handle] = NULL;
-               } else if (streq(command, "start"))
-                       do_start(handle, arg(line, 1));
-               else if (streq(command, "commit"))
-                       do_end(handle, false);
-               else if (streq(command, "abort"))
-                       do_end(handle, true);
-               else if (streq(command, "introduce"))
-                       do_introduce(handle, arg(line, 1), arg(line, 2),
-                                    arg(line, 3), arg(line, 4));
-               else if (streq(command, "release"))
-                       do_release(handle, arg(line, 1));
-               else if (streq(command, "dump"))
-                       dump(handle);
-               else if (streq(command, "sleep"))
-                       sleep(atoi(arg(line, 1)));
-               else
-                       barf("Unknown command %s", command);
-               fflush(stdout);
-               alarm(0);
-       }
+       while (children)
+               do_asyncwait(0);
        return 0;
 }